Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle(while:on:) #58

Merged
merged 8 commits into from
Nov 29, 2016
Merged

Add throttle(while:on:) #58

merged 8 commits into from
Nov 29, 2016

Conversation

sharplet
Copy link
Contributor

@sharplet sharplet commented Oct 6, 2016

There are times where you'd like to throttle values whenever some condition is met, rather than based on a time interval.

This is inspired by ReactiveViewModel's -throttleSignalWhileInactive:.

Basic usage might involve a simple MutableProperty to toggle throttled values:

let isActive = MutableProperty(false)

label.reactive.text <~ somethingChanged
  .throttle(while: isActive.map(!), on: QueueScheduler())
  .flatMap(.latest, transform: doSomethingExpensive)
  .observeOn(UIScheduler())

// later

isActive.value = true

At some point it would be great to be able to throttle events while the view has disappeared:

somethingChanged
  .throttle(while: reactive.viewHasDisappeared, on: UIScheduler())
  .startWithValues {
    // expensive layout stuff
  }

I'd love some feedback on this, especially potential thread safety issues! For example, I'm a little concerned that shouldThrottle could technically have changed between reading shouldThrottle.value and starting shouldThrottle.producer.uniqueValues { $0 }.

@sharplet
Copy link
Contributor Author

sharplet commented Oct 6, 2016

There are also no tests yet, which I'm planning to rectify. I should be able to cover the main semantics, although I don't trust I can write tests to verify this code is thread-safe.

Copy link
Contributor

@mdiep mdiep left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some preliminary thoughts. The idea makes sense to me overall. 👍

/// - scheduler: A scheduler to deliver events on.
///
/// - returns: A signal that sends values only while `shouldThrottle` is false.
func throttle<P: PropertyProtocol>(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> Signal<Value, Error>
Copy link
Contributor

@mdiep mdiep Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should take a Signal, not a Property. 😕

It should be easy to go from the property to the signal, but it's not easy to go the other way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I kinda see it the other way. If this was implemented in terms of Signal, we'd have to pick a default starting state for the throttle, probably .resumed so that events are passed through. Using Property means we don't have to make that assumption.

From a slightly different perspective, Property is more powerful than Signal in that you could implement a signal-based overload of this method by composing it with an initial value and calling through to the property-based implementation.

I pushed 4dcd2bf to demonstrate what I mean. If you think those overloads would be useful I'll add some docs and merge those changes in here.

disposable += schedulerDisposable
disposable += { _ = shouldThrottle }

disposable += shouldThrottle.producer.uniqueValues { $0 }.startWithValues { shouldThrottle in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably meant skipRepeats here, not uniqueValues?

Also, this should be formatted according to the RAC style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

} else {
if case .throttled(let value?) = state {
schedulerDisposable.innerDisposable = scheduler.schedule {
observer.send(value: value)
Copy link
Contributor

@mdiep mdiep Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this value should get sent outside of the Atomic's lock to prevent deadlock.

Even though it's in a schedule block, that can execute synchronously if, e.g., you're on the main queue and you're using the UIScheduler.

Same goes for the send below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change in e2dd090, let me know what you think.

} else {
return false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be return self == .terminated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That requires explicit Equatable conformance of ThrottleWhileState since case throttled has an associated value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, this could also become a switch statement if you prefer, which would add exhaustively checking.

Copy link
Member

@NachoSoto NachoSoto Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would add exhaustively checking.

And fail to compile if one adds a new case, which is a good thing

}
return .resumed
}
}
Copy link
Member

@ikesyo ikesyo Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value from an action of Atomic.modify is not a new value for the Atomic instance since ReactiveCocoa/ReactiveCocoa#2984. So this should be written as follows:

let valueToSend = state.modify { state -> Value? in
    guard !state.isTerminated else { return nil }

    if shouldThrottle {
        state = .throttled(nil)
        return nil
    } else {
        state = .resumed
        if case let .throttled(value) = state {
            return value
        } else {
            return nil
        }
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I've pushed 58e5bed. I ended up having a single return nil after the conditional block, but if everyone prefers this style (a return at every branch) I can change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up having a single return nil after the conditional block

👍

eventToSend = event
return .terminated
}
}
Copy link
Member

@ikesyo ikesyo Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as https://github.com/ReactiveCocoa/ReactiveSwift/pull/58/files#r82397528:

let eventToSend = state.modify { state -> Event<Value, Error>? in
    switch event {
    case let .value(value):
        switch state {
        case .throttled:
            state = .throttled(value)
            return nil
        case .resumed:
            return event
        case .terminated:
            return nil
        }

    case .completed, .interrupted, .failed:
        state = .terminated
        return event
    }
}

} else {
state = .resumed

if case .throttled(let value?) = state {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to unwrap the bound value with ? since the return type is Value?: case .throttled(let value).

Copy link
Member

@ikesyo ikesyo Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consistently place let right after the case: if case let .throttled(value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the let outside 👍

You don't need to unwrap the bound value with ? since the return type is Value?

While that's technically true, I think there's value in being precise with exactly the conditions where this branch should be followed. The idea is that we should only send value event if a value was actually throttled, and I feel like this version communicates that more clearly. What do you think?

disposable += self.observe { event in
let eventToSend = state.modify { state -> Event<Value, Error>? in
switch event {
case .value(let value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state = .throttled(value)
return nil
case .resumed:
return .value(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could return the original event here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sharplet sharplet force-pushed the as-throttle-while branch 2 times, most recently from ee9260f to c0367c3 Compare October 12, 2016 14:26
@sharplet
Copy link
Contributor Author

Ok, I think I've addressed everyone's feedback, thank you! I've also pushed some tests.

@mdiep Do you think it's worth adding the Signal and SignalProducer overloads like I mentioned in #58 (comment)?

@mdiep
Copy link
Contributor

mdiep commented Nov 6, 2016

Do you think it's worth adding the Signal and SignalProducer overloads like I mentioned in #58

I think we can go without them for now.

Mind merging master in? That should fix up the SwiftPM tests.

@sharplet
Copy link
Contributor Author

sharplet commented Nov 6, 2016

And we're green 👍

where P.Value == Bool
{
return Signal { observer in
let initial: ThrottleWhileState<Value> = shouldThrottle.value ? .throttled(nil) : .resumed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initial check seems unnecessary, since shouldThrottle.producer below would emit the current value.

@sharplet
Copy link
Contributor Author

@andersio Is this kind of thing what you had in mind? I'm trying to work out how to safely get the initial value, and also not miss any changes to shouldThrottle...

diff --git a/Sources/Signal.swift b/Sources/Signal.swift
index cb3adbf..941bf79 100644
--- a/Sources/Signal.swift
+++ b/Sources/Signal.swift
@@ -1515,7 +1515,8 @@ extension SignalProtocol {
        where P.Value == Bool
    {
        return Signal { observer in
-           let initial: ThrottleWhileState<Value> = shouldThrottle.value ? .throttled(nil) : .resumed
+           let shouldThrottleProducer = shouldThrottle.producer.replayLazily(upTo: Int.max)
+           let initial: ThrottleWhileState<Value> = shouldThrottleProducer.first()!.value! ? .throttled(nil) : .resumed
            let state = Atomic(initial)
            let schedulerDisposable = SerialDisposable()

@@ -1523,7 +1524,8 @@ extension SignalProtocol {
            disposable += schedulerDisposable
            disposable += { _ = shouldThrottle }

-           disposable += shouldThrottle.producer
+           disposable += shouldThrottleProducer
+               .skip(first: 1)
                .skipRepeats()
                .startWithValues { shouldThrottle in
                    let valueToSend = state.modify { state -> Value? in

@andersio
Copy link
Member

andersio commented Nov 16, 2016

@sharplet I mean the reverse of this. initial can be simply .resumed. Starting shouldThrottle.producer would always give the current value of the property synchronously, so there is no need to grab it twice.

@sharplet
Copy link
Contributor Author

Oh, that totally clears things up. Fixed in f757e53 (and rebased).


let disposable = CompositeDisposable()
disposable += schedulerDisposable
disposable += { _ = shouldThrottle }
Copy link
Member

@andersio andersio Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sharplet Hmm, may I ask what's the specific reason to retain it? I don't think the resulting signal should own shouldThrottle in any way.

What if it terminates the resulting signal when shouldThrottle completes, gaining the take(during:) semantic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind it's the same reason that Action captures a strong reference to its enabledIf property. So to me it seems convenient, not having to maintain an extra strong reference to that property.

But that doesn't help me to understand what the problem with that strong reference is.

What if it terminates the resulting signal when shouldThrottle completes, gaining the take(during:) semantic?

Regardless of the current behaviour, are you saying that if shouldThrottle completes, the returned signal should or should not also complete? I think it would make sense to add some tests for these semantics so it's clear what we want the behaviour to be.

Copy link
Member

@andersio andersio Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An observation to the push-based Signal should not have any effect on the event stream. If we treat operator as specialised observations, and Propertys as specialised Signals with stronger guarantees, by the logic a signal operator that takes a Property should not cause any side effect on its sources.

Now retaining the property violates this, as the retainment influences the termination of the stream (the property) that is supposed to just be observed.

IIRC this would be the first ever signal operator that has a side effect upon the upstream signal if merged.

Action is a different story, since it is not a stream of value itself, but a state machine that owns a few streams.

Regardless of the current behaviour, are you saying that if shouldThrottle completes, the returned signal should or should not also complete?

Either would do, but completing with shouldThrottle's completion sounds a fit. Say for a Swift equivalent of ReactiveViewModel, I think it makes sense to have these throttled signals gone together with the supplied shouldThrottle property (which is owned by the view model).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me like the options are:

  1. When shouldThrottle completes, the signal may get stuck in the throttled state (i.e., whichever state was last)
  2. When shouldThrottle completes, the state gets switched back to .resumed
  3. When shouldThrottle completes, the signal completes

(2) feels plain wrong to me, which makes it a toss-up between (1) and (3). So I feel like it comes down to which option is more surprising, and which option is easier to see/debug. For that reason I'm leaning towards tying the lifetime of the returned signal to the lifetime of shouldThrottle.

@mdiep What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or anyone else who's been involved in the thread of course! 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) seems like the right thing to me, although it's a tough call.

That seems like the least surprising thing, and most in line with other operators. (c.f. sample, which only completes when both signals have completed)

We should definitely add tests for this—and probably for an interrupt on the signal as well.

@andersio andersio added this to the 1.0 milestone Nov 18, 2016
public func throttle<P: PropertyProtocol>(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> SignalProducer<Value, Error>
where P.Value == Bool
{
return lift { $0.throttle(while: shouldThrottle, on: scheduler) }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps a strong reference to shouldThrottle for SignalProducers. @andersio does this need to change as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first thing I tried was to wrap shouldThrottle in an existential:

let shouldThrottle = Property(shouldThrottle)
return lift { $0.throttle(while: shouldThrottle, on: scheduler) }

But that also increases the lifetime of the underlying property. Is there any way to capture a reference to this property that won't extend its lifetime? It looks like composing properties fundamentally alters the lifetime of the source property, extending it to be the union of all transitive property lifetimes. Is that intentional?

Copy link
Member

@andersio andersio Nov 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is intentional IIRC. All composed properties derived from a property is considered just a reduced/transformed view of that property, so that the behaviour is in line with existentials which retain the wrapped property (like the one in your example).

That's said it might be worth discussing if the source capturing should be dropped. It could be an oversight (especially with the arguments I made in #58 (comment)... ehm), since in ReactiveCocoa/ReactiveCocoa#2922 @mdiep and I focused just on how signal and producer should behave.

Edit: Edited many times.

Edit 2: It seems the capturing is originated from ReactiveCocoa/ReactiveCocoa#2788. The argument to capture was different though, and it was actually invalidated by ReactiveCocoa/ReactiveCocoa#2922...

Edit 3: Opened a PR. #117

Copy link
Member

@andersio andersio Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#117 is merged. Now this is the last stop before RC1 (hopefully). 🎉

The semantics of `Atomic.modify` have changed such that the return value
of the closure is no longer the old state, and the state is instead
modified by assigning to the `inout` parameter.
When `shouldThrottle.producer` is started, the observer will be executed
synchronously first time. This makes the initial check redundant,
because the value will be updated as soon as `shouldThrottle` is
observed.
///
/// - note: If `shouldThrottle` completes before the receiver, and its last
/// value is `true`, the returned signal will remain in the throttled
/// state, emitting no further values until it terminates.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4 notes excessive? 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just two notes (shouldThrottle + termination behavior), or no notes at all?

@sharplet
Copy link
Contributor Author

That should be everything, please take a look at the latest 2 commits!

@sharplet
Copy link
Contributor Author

Streamlined to 3 notes in the doc comments.

@sharplet
Copy link
Contributor Author

Green 👍

@andersio
Copy link
Member

:shipit:

@mdiep mdiep merged commit be6f0ad into master Nov 29, 2016
@mdiep mdiep deleted the as-throttle-while branch November 29, 2016 13:10
@sharplet
Copy link
Contributor Author

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants